package com.my.service.task.map;

import com.alibaba.fastjson.JSONObject;
import com.my.service.task.entity.ChaomanAndWomenInfo;
import com.my.service.task.util.HbaseUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;

public class ChaomanAndwomenbyreduceMap implements FlatMapFunction<ChaomanAndWomenInfo,ChaomanAndWomenInfo>  {

    @Override
    public void flatMap(ChaomanAndWomenInfo chaomanAndWomenInfo, Collector<ChaomanAndWomenInfo> collector) throws Exception {
        Map<String, Long> resultMap = new HashMap<>();
        String rowkey = chaomanAndWomenInfo.getUserid() + "";
        for(ChaomanAndWomenInfo singInfo : chaomanAndWomenInfo.getList()){
            String chaotype = singInfo.getChaotype();
            resultMap.put(chaotype, resultMap.getOrDefault(chaotype,0L) + 1);
        }
        String tablename = "userflaginfo";
        String famliyname = "userbehavior";
        String colum = "chaomanandwomen";
        // 如果Hbase中已经存在了这个userid对应的日志所生成的记录潮牌购买次数的hashmap，那么取出来更新一下
        String data = HbaseUtils.getdata(tablename, rowkey, famliyname, colum);
        if (StringUtils.isNotBlank(data)) {
            Map<String, Long> datamap = JSONObject.parseObject(data, Map.class);
            Set<String> keys = resultMap.keySet();
            for (String key : keys) {
                Long pre1 = datamap.get(key) == null ? 0l : datamap.get(key);
                resultMap.put(key, pre1 + resultMap.get(key));
            }
        }
        if (!resultMap.isEmpty()) {
            // 将更新后的潮牌类男性商品的购买次数与潮牌类女性商品的购买次数都保存到Hbase中
            String chaomandanwomenmap = JSONObject.toJSONString(resultMap);
            HbaseUtils.putdata(tablename, rowkey, famliyname, colum, chaomandanwomenmap);
            long chaoman = resultMap.get("1") == null ? 0l : resultMap.get("1");
            long chaowomen = resultMap.get("2") == null ? 0l : resultMap.get("2");
            String flag = "women";
            long finalcount = chaowomen;
            if (chaoman > chaowomen) {
                flag = "man";
                finalcount = chaoman;
            }
            // 如果统计次数超过2000，那么就对该userid进行定性，到底是潮男还是潮女
            // 只有统计次数超过2000，才生成一个新的ChaomanAndWomenInfo来返回
            if (finalcount > 2000) {
                colum = "chaotype";
                ChaomanAndWomenInfo chaomanAndWomenInfotemp = new ChaomanAndWomenInfo();
                chaomanAndWomenInfotemp.setChaotype(flag);
                chaomanAndWomenInfotemp.setCount(1l);
                chaomanAndWomenInfotemp.setGroupbyfield(flag + "==chaomanAndWomenInforeduce");
                String type = HbaseUtils.getdata(tablename, rowkey, famliyname, colum);
                if (StringUtils.isNotBlank(type) && !type.equals(flag)) {
                    ChaomanAndWomenInfo chaomanAndWomenInfopre = new ChaomanAndWomenInfo();
                    chaomanAndWomenInfopre.setChaotype(type);
                    chaomanAndWomenInfopre.setCount(-1l);
                    chaomanAndWomenInfopre.setGroupbyfield(type + "==chaomanAndWomenInforeduce");
                    collector.collect(chaomanAndWomenInfopre);
                }

                HbaseUtils.putdata(tablename, rowkey, famliyname, colum, flag);
                collector.collect(chaomanAndWomenInfotemp);
            }

        }
    }

}
